package com.cloudansys.core.flink.sink;

import com.alibaba.fastjson.JSON;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.LiftDataEntity;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.*;
import com.cloudansys.hawkeye.modules.settings.model.vo.PrimaryWarn;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.eclipse.paho.client.mqttv3.MqttClient;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class WarnDetector extends RichSinkFunction<List<MultiDataEntity>> {

    private static String pid;
    private static Jedis jedis;
    private static String topic_data_xw;
    private static String topic_data_yl;
    private static String topic_data_we;
    private static String topic_data_ts;
    private static String topic_warn;
    private static String redisWarnsKey;
    private static MqttClient mqttClient;
    private static long lastTimeMqtt = 0;
    private static int count = 0;
    private static Pipeline pipeline;
    private static long lastTimeRedis = 0;
    private SimpleDateFormat format = new SimpleDateFormat(Const.FMT_TRIM_MILLI);

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
        pipeline = DefaultConfig.getPipeline();
        pid = jedis.get(Const.PID);
        topic_data_xw = DefaultConfig.get(Const.MQTT_TOPIC_DATA_XW);
        topic_data_yl = DefaultConfig.get(Const.MQTT_TOPIC_DATA_YL);
        topic_data_we = DefaultConfig.get(Const.MQTT_TOPIC_DATA_WE);
        topic_data_ts = DefaultConfig.get(Const.MQTT_TOPIC_DATA_TS);
        topic_warn = DefaultConfig.get(Const.MQTT_TOPIC_WARN);
        redisWarnsKey = JedisUtil.getWarnKey();
        mqttClient = DefaultConfig.getMqttClient();
    }

    /**
     * 阈值告警
     * 告警信息推送 mqtt
     * 告警信息缓存到 redis
     * 秒级瞬时值（包含告警信息）存储到 redis
     */
    @Override
    public void invoke(List<MultiDataEntity> element, Context context) {
//        log.info("element: {}", element);
        for (MultiDataEntity multiDataEntity : element) {
            String targetType = multiDataEntity.getTargetType();
            /* 实时推送 */
            sendToMqtt(multiDataEntity);

            /* 阈值告警判断 */
            // 非倾角类型
            if (!targetType.equals(Const.TT_QJ)) {
                // 判断是否产生了告警，告警则推送 mqtt
                List<PrimaryWarn> warnList = WarnUtil.judgeWarn(jedis, multiDataEntity);
//            log.info("============warnSize: {}", warnList.size());
                if (warnList.size() != 0) {
                    // 告警信息存储到 redis 中
//                saveWarnToRedis(multiDataEntity);

                    // 告警推送
//                    sendWithFixedDelay(warnList);
                    sendWithoutDelay(warnList);
                }
            }
        }

    }

    /**
     * 数据推送
     */
    private void sendToMqtt(MultiDataEntity multiDataEntity) {
        try {
            LiftDataEntity targetInfo = getTargetInfo(multiDataEntity);
            if (targetInfo == null) {
                return;
            }
            // topic
            String projectId = String.valueOf(targetInfo.getProjectId());
            String serialCode = targetInfo.getSerialCode();
            int sc = Integer.parseInt(serialCode);
            String topic = null;
            // 限位装置位移
            if (sc >= 54 && sc <= 71) {
                topic = StrUtil.stringsToStr(Const.SLASH, topic_data_xw, projectId, serialCode);
            }
            // 局部应力
            if (sc >= 7 && sc <= 30) {
                topic = StrUtil.stringsToStr(Const.SLASH, topic_data_yl, projectId, serialCode);
            }
            // 风叶夹角
            if (sc == 72) {
                topic = StrUtil.stringsToStr(Const.SLASH, topic_data_we, projectId, serialCode);
            }
            // 当前轮毂高度
            if (sc == 73) {
                topic = StrUtil.stringsToStr(Const.SLASH, topic_data_ts, projectId, serialCode);
            }
            if (topic == null) {
                return;
            }
            // 数据
            String jsonString = JSON.toJSONString(targetInfo);

            // 向 mqtt 推送数据
            MqttUtil.sendToMqtt(mqttClient, topic, jsonString);
//            log.info("##### send to mqtt success #####");
        } catch (Exception e) {
            log.info("##### send to mqtt failed #####");
            log.error("errMsg: " + e.getMessage());
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis != null) {
            jedis.close();
        }
        if (mqttClient != null) {
            mqttClient.disconnect();
            mqttClient.close();
        }
    }

    /**
     * 控制批量写入 redis 的频率
     * 每读取十万条数据或者每隔 500ms 提交到 Redis 存储一次
     */
    private void flushToRedis() {
        long thisTime = System.nanoTime();
        long diffTime = (thisTime - lastTimeRedis) / 1000000;
        if (count > 10000 || diffTime > 1000) {
//            log.info("count: {}====diffTime: {}", count, diffTime);
            pipeline.sync();
            count = 0;
            lastTimeRedis = System.nanoTime();
            log.info("##### write in redis success #####");
        }
    }

    /**
     * 把告警信息存储到 redis 中，并设置有效期（30分钟）
     * 为了判断传感器告警状态
     */
    private void saveWarnToRedis(MultiDataEntity multiDataEntity) {
        // 构造 redis 存储 key
        String projectId = multiDataEntity.getProjectId();
        String serialCode = multiDataEntity.getSerialCode();
        List<String> list = new ArrayList<>();
        list.add(Const.HK_SM_WARN);
        list.add(projectId);
        list.add(serialCode);
        String redisKey = JedisUtil.formatSmKey(list);
        jedis.set(redisKey.getBytes(), SerializeUtil.serialize(multiDataEntity));
        jedis.expire(redisKey, 1800);
    }

    /**
     * 秒级瞬时值存储到 redis
     */
    private void saveInstantToRedis(MultiDataEntity multiDataEntity) {
        /* ------------- 把当前提升阶段和轮毂提升高度实时更新到 redis 中 ------------- */
        String projectId = multiDataEntity.getProjectId();
        // 把当前的项目id（即提升阶段）缓存在 redis 中
        if (pid == null || !pid.equals(projectId)) {
            jedis.set(Const.PID, projectId);
        }

        /* ------------- 把各个传感器的秒级瞬时值实时更新到 redis 中 ------------- */
        String serialCode = multiDataEntity.getSerialCode();
        long timestamp = 0;
        try {
            timestamp = format.parse(multiDataEntity.getPickTime()).getTime();
        } catch (ParseException e) {
            log.error("日期格式解析失败！");
            e.printStackTrace();
        }

        // 构造 redis 存储 key
        List<String> list = new ArrayList<>();
        list.add(Const.HK_SM);
        list.add(projectId);
        list.add(serialCode);
        String redisKey = JedisUtil.formatSmKey(list);

        // 构造 redis 存储 value 【hash 类型】
        Double[] values = multiDataEntity.getValues();
        Map<String, String> valueMap = new HashMap<>();
        valueMap.put(Const.HK_SM_TS, String.valueOf(timestamp));
        for (int i = 0; i < values.length; i++) {
            // v1 v2 v3 ...
            String fieldName = Const.HK_SM_VAL + (i + 1);
            String fieldValue = String.valueOf(values[i]);
            valueMap.put(fieldName, fieldValue);
        }
//        log.info("valueMap: {}", valueMap);
        pipeline.hmset(redisKey, valueMap);
        pipeline.expire(redisKey, 5);
        count++;
    }

    /**
     * 控制告警推送频率【测试暂用】
     */
    private void sendWithFixedDelay(List<PrimaryWarn> warnList) {
//        log.info("warn: {}", warnList);
        for (PrimaryWarn primaryWarn : warnList) {
            // 每隔300秒处理一次告警【测试暂定】
            long thisTime = System.currentTimeMillis();
            long diffTime = (thisTime - lastTimeMqtt) / 1000;
            if (diffTime > 30) {
                warnDealer(primaryWarn);
                lastTimeMqtt = System.currentTimeMillis();
            }
        }
    }

    /**
     * 无延迟控制告警推送
     */
    private void sendWithoutDelay(List<PrimaryWarn> warnList) {
        for (PrimaryWarn primaryWarn : warnList) {
            warnDealer(primaryWarn);
        }
    }

    /**
     * 告警处理
     *
     * @param primaryWarn 告警对象
     */
    private void warnDealer(PrimaryWarn primaryWarn) {
        try {
            // 设置告警时当前的提升高度
            double currentHeight = Double.parseDouble(jedis.get(Const.TS_REDIS));
            primaryWarn.setCurrentHeight(currentHeight);
            // topic
            String projectId = String.valueOf(primaryWarn.getProjectId());
            String sc = String.valueOf(primaryWarn.getTargetId());
            String topic = StrUtil.stringsToStr(Const.SLASH, topic_warn, projectId, sc);
            // json 对象
            String jsonString = JSON.toJSONString(primaryWarn);
            log.info("primaryWarn: {}", jsonString);
            publish(topic, jsonString);

            // 告警信息同时存储到 redis 缓存中
            jedis.lpush(redisWarnsKey, jsonString);
            log.info("##### save to redis success #####");
        } catch (Exception e) {
            log.info("##### save to redis failed #####");
            log.error("errMsg: " + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 向 mqtt 指定主题发布消息
     *
     * @param topic 发布的主题名称
     * @param msg   发布的消息
     */
    private void publish(String topic, String msg) {
        try {
            // 把从 redis 缓存中获取的数据推送到 mqtt
            MqttUtil.sendToMqtt(mqttClient, topic, msg);
//            log.info("##### send 【{}】 to mqtt success #####", topic);
        } catch (Exception e) {
            log.info("##### send 【{}】 to mqtt failed #####", topic);
            log.error("errMsg: " + e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 获取 redis 缓存中对应的传感器信息
     * 【key -> hk:bi:tgt:projectId_serialCode】，
     */
    private LiftDataEntity getTargetInfo(MultiDataEntity multiDataEntity) {
        String projectId = multiDataEntity.getProjectId();
        String serialCode = multiDataEntity.getSerialCode();
        String pickTime = multiDataEntity.getPickTime();
        Double[] values = multiDataEntity.getValues();
        // 获取 redis 缓存中对应的传感器信息
        String tgtKey = "hk:bi:tgt:" + projectId + Const.BAR_BOTTOM + serialCode;
        String s = jedis.get(tgtKey);
        if (s == null) {
            log.error("获取传感器基础信息失败！redisKey 为：{}", tgtKey);
            return null;
        }
        LiftDataEntity liftDataEntity = (LiftDataEntity) SerializeUtil.deserialize(s.getBytes());
        // 更新数据和数据时间
        liftDataEntity.setQuotaValues(values);
        liftDataEntity.setPickTime(CustomDateUtil.formatToStdTime(pickTime));
        return liftDataEntity;
    }

}
